Skip to main content
⏳ Estimated read time: 5 min read

Apache Airflow

Metrics

Instrumentation For Offical Apache Helm Chart

If you are making use of the official apache helm chart, make sure the statsd exporter is enabled with proper service annotations, if not, you can enable it by adding the following snippet to the values.yaml file of the airflow deployment:

statsd:
enabled: true
service:
extraAnnotations:
prometheus.io/port: "9102"
prometheus.io/scrape: "true"

You will also need to add the following extraMappings snippet under the statsd configurations of the values.yaml file of the airflow deployment:

statsd:
extraMappings:
- match: "(.+)\\.(.+)_start$"
match_metric_type: counter
name: "af_agg_job_start"
match_type: regex
labels:
airflow_id: "$1"
job_name: "$2"
- match: "(.+)\\.(.+)_end$"
match_metric_type: counter
name: "af_agg_job_end"
match_type: regex
labels:
airflow_id: "$1"
job_name: "$2"
- match: "(.+)\\.operator_failures_(.+)$"
match_metric_type: counter
name: "af_agg_operator_failures"
match_type: regex
labels:
airflow_id: "$1"
operator_name: "$2"
- match: "(.+)\\.operator_successes_(.+)$"
match_metric_type: counter
name: "af_agg_operator_successes"
match_type: regex
labels:
airflow_id: "$1"
operator_name: "$2"
- match: "*.ti_failures"
match_metric_type: counter
name: "af_agg_ti_failures"
labels:
airflow_id: "$1"
- match: "*.ti_successes"
match_metric_type: counter
name: "af_agg_ti_successes"
labels:
airflow_id: "$1"
- match: "*.zombies_killed"
match_metric_type: counter
name: "af_agg_zombies_killed"
labels:
airflow_id: "$1"
- match: "*.scheduler_heartbeat"
match_metric_type: counter
name: "af_agg_scheduler_heartbeat"
labels:
airflow_id: "$1"
- match: "*.dag_processing.processes"
match_metric_type: counter
name: "af_agg_dag_processing_processes"
labels:
airflow_id: "$1"
- match: "*.scheduler.tasks.killed_externally"
match_metric_type: counter
name: "af_agg_scheduler_tasks_killed_externally"
labels:
airflow_id: "$1"
- match: "*.scheduler.tasks.running"
match_metric_type: counter
name: "af_agg_scheduler_tasks_running"
labels:
airflow_id: "$1"
- match: "*.scheduler.tasks.starving"
match_metric_type: counter
name: "af_agg_scheduler_tasks_starving"
labels:
airflow_id: "$1"
- match: "*.scheduler.orphaned_tasks.cleared"
match_metric_type: counter
name: "af_agg_scheduler_orphaned_tasks_cleared"
labels:
airflow_id: "$1"
- match: "*.scheduler.orphaned_tasks.adopted"
match_metric_type: counter
name: "af_agg_scheduler_orphaned_tasks_adopted"
labels:
airflow_id: "$1"
- match: "*.scheduler.critical_section_busy"
match_metric_type: counter
name: "af_agg_scheduler_critical_section_busy"
labels:
airflow_id: "$1"
- match: "*.sla_email_notification_failure"
match_metric_type: counter
name: "af_agg_sla_email_notification_failure"
labels:
airflow_id: "$1"
- match: "*.ti.start.*.*"
match_metric_type: counter
name: "af_agg_ti_start"
labels:
airflow_id: "$1"
dag_id: "$2"
task_id: "$3"
- match: "*.ti.finish.*.*.*"
match_metric_type: counter
name: "af_agg_ti_finish"
labels:
airflow_id: "$1"
dag_id: "$2"
task_id: "$3"
state: "$4"
- match: "*.dag.callback_exceptions"
match_metric_type: counter
name: "af_agg_dag_callback_exceptions"
labels:
airflow_id: "$1"
- match: "*.celery.task_timeout_error"
match_metric_type: counter
name: "af_agg_celery_task_timeout_error"
labels:
airflow_id: "$1"

# === Gauges ===
- match: "*.dagbag_size"
match_metric_type: gauge
name: "af_agg_dagbag_size"
labels:
airflow_id: "$1"
- match: "*.dag_processing.import_errors"
match_metric_type: gauge
name: "af_agg_dag_processing_import_errors"
labels:
airflow_id: "$1"
- match: "*.dag_processing.total_parse_time"
match_metric_type: gauge
name: "af_agg_dag_processing_total_parse_time"
labels:
airflow_id: "$1"
- match: "*.dag_processing.last_runtime.*"
match_metric_type: gauge
name: "af_agg_dag_processing_last_runtime"
labels:
airflow_id: "$1"
dag_file: "$2"
- match: "*.dag_processing.last_run.seconds_ago.*"
match_metric_type: gauge
name: "af_agg_dag_processing_last_run_seconds"
labels:
airflow_id: "$1"
dag_file: "$2"
- match: "*.dag_processing.processor_timeouts"
match_metric_type: gauge
name: "af_agg_dag_processing_processor_timeouts"
labels:
airflow_id: "$1"
- match: "*.executor.open_slots"
match_metric_type: gauge
name: "af_agg_executor_open_slots"
labels:
airflow_id: "$1"
- match: "*.executor.queued_tasks"
match_metric_type: gauge
name: "af_agg_executor_queued_tasks"
labels:
airflow_id: "$1"
- match: "*.executor.running_tasks"
match_metric_type: gauge
name: "af_agg_executor_running_tasks"
labels:
airflow_id: "$1"
- match: "*.pool.open_slots.*"
match_metric_type: gauge
name: "af_agg_pool_open_slots"
labels:
airflow_id: "$1"
pool_name: "$2"
- match: "*.pool.queued_slots.*"
match_metric_type: gauge
name: "af_agg_pool_queued_slots"
labels:
airflow_id: "$1"
pool_name: "$2"
- match: "*.pool.running_slots.*"
match_metric_type: gauge
name: "af_agg_pool_running_slots"
labels:
airflow_id: "$1"
pool_name: "$2"
- match: "*.pool.starving_tasks.*"
match_metric_type: gauge
name: "af_agg_pool_starving_tasks"
labels:
airflow_id: "$1"
pool_name: "$2"
- match: "*.smart_sensor_operator.poked_tasks"
match_metric_type: gauge
name: "af_agg_smart_sensor_operator_poked_tasks"
labels:
airflow_id: "$1"
- match: "*.smart_sensor_operator.poked_success"
match_metric_type: gauge
name: "af_agg_smart_sensor_operator_poked_success"
labels:
airflow_id: "$1"
- match: "*.smart_sensor_operator.poked_exception"
match_metric_type: gauge
name: "af_agg_smart_sensor_operator_poked_exception"
labels:
airflow_id: "$1"
- match: "*.smart_sensor_operator.exception_failures"
match_metric_type: gauge
name: "af_agg_smart_sensor_operator_exception_failures"
labels:
airflow_id: "$1"
- match: "*.smart_sensor_operator.infra_failures"
match_metric_type: gauge
name: "af_agg_smart_sensor_operator_infra_failures"
labels:
airflow_id: "$1"

# === Timers ===
- match: "*.dagrun.dependency-check.*"
match_metric_type: observer
name: "af_agg_dagrun_dependency_check"
labels:
airflow_id: "$1"
dag_id: "$2"
- match: "*.dag.*.*.duration"
match_metric_type: observer
name: "af_agg_dag_task_duration"
labels:
airflow_id: "$1"
dag_id: "$2"
task_id: "$3"
- match: "*.dag_processing.last_duration.*"
match_metric_type: observer
name: "af_agg_dag_processing_duration"
labels:
airflow_id: "$1"
dag_file: "$2"
- match: "*.dagrun.duration.success.*"
match_metric_type: observer
name: "af_agg_dagrun_duration_success"
labels:
airflow_id: "$1"
dag_id: "$2"
- match: "*.dagrun.duration.failed.*"
match_metric_type: observer
name: "af_agg_dagrun_duration_failed"
labels:
airflow_id: "$1"
dag_id: "$2"
- match: "*.dagrun.schedule_delay.*"
match_metric_type: observer
name: "af_agg_dagrun_schedule_delay"
labels:
airflow_id: "$1"
dag_id: "$2"
- match: "*.scheduler.critical_section_duration"
match_metric_type: observer
name: "af_agg_scheduler_critical_section_duration"
labels:
airflow_id: "$1"
- match: "*.dagrun.*.first_task_scheduling_delay"
match_metric_type: observer
name: "af_agg_dagrun_first_task_scheduling_delay"
labels:
airflow_id: "$1"
dag_id: "$2"

The Airflow Cluster Dashboard can be added into your grafana instance for Visualization.

Instrumentation For Community Helm Chart

If you are making use of the community helm chart, you can enable metrics instrumentaion by following any of the below mentioned methods.

Configuring airflow exporter for Metrics

Using airflow-exporter, you can enable metrics by setting the following in the values.yaml file of the airflow deployment:

airflow:
extraPipPackages: ["airflow-exporter"]

and

web:
service:
annotations:
prometheus.io/path: /admin/metrics
prometheus.io/port: "8080"
prometheus.io/scrape: "true"

:::

Configuring OpenTelemetry for Metrics

You can use otel for instrumenting airflow metrics by setting the following in the values.yaml file of the airflow deployment:

airflow:
extraPipPackages:
- "apache-airflow[otel]"
config:
AIRFLOW__METRICS__OTEL_ON: "True"
AIRFLOW__METRICS__OTEL_HOST: "<otel_collector_service_name>.<namespace>.svc.cluster.local"
AIRFLOW__METRICS__OTEL_PORT: 4318
AIRFLOW__METRICS__OTEL_PREFIX: "airflow"

For more configuration options for metrics, you can refer the airflow otel metrics documentaion. :::

Update your airflow deployment using the helm upgrade command and you should be able to see metrics coming to your Grafana.


Traces

Otel Instrumentation for Community Airflow helm chart

If you are making use of the community helm chart, you can configure your Airflow instance to send traces to your Grafana.

info

Tracing can only be configured if you are using Airflow version 2.10.1 and above. For the versions below that, airflow does not support traces instrumentation.

To configure tracing, please add the traces configuration and the apache-airflow[otel] package for your airflow by updating values.yaml as follows -

airflow:
extraPipPackages:
- "apache-airflow[otel]"
config:
AIRFLOW__TRACES__OTEL_ON: "True"
AIRFLOW__TRACES__OTEL_HOST: "<otel_collector_service_name>.<namespace>.svc.cluster.local"
AIRFLOW__TRACES__OTEL_PORT: 4318
AIRFLOW__TRACES__OTEL_TASK_LOG_EVENT: "True"
AIRFLOW__TRACES__OTEL_SERVICE: "airflow"

For more configuration options for traces, you can check the Airlfow traces documentation.

warning

If you set otel_debugging_on to True, airflow will print traces to the console instead of sending it to configured host.

Update your airflow deployment using the helm upgrade command and you should be able to see traces coming to your Grafana.